package kafka

import (
	"github.com/Shopify/sarama"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/ctx"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/fx"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/log"
)

type ConsumerGroup interface {
	Close() error
	Consume(ctx ctx.Context, handle ConsumerFunc, topics []string) error
}

type consumerGroup struct {
	consumerGroup sarama.ConsumerGroup
}

// Consume 消费组消费消息
func (c *consumerGroup) Consume(ctx ctx.Context, handle ConsumerFunc, topics []string) error {
	go func() {
		_, _ = fx.RunSafe(func() error {
			for {
				consumerGroupHandle := ConsumerGroupHandler{
					handle: handle,
				}
				err := c.consumerGroup.Consume(ctx, topics, consumerGroupHandle)
				if err != nil {
					log.Warn(ctx).Msgf("kafka consume error:%v", err)
				}
			}
		})
	}()
	return nil
}

func (c *consumerGroup) Close() error {
	return c.consumerGroup.Close()
}

type ConsumerFunc func(c ctx.Context, key string, value *string) error

type ConsumerGroupHandler struct {
	handle ConsumerFunc
}

func (c ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (c ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (c ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		valueStr := string(msg.Value)
		cc := ctx.New()
		err := c.handle(cc, string(msg.Key), &valueStr)
		if err != nil {
			log.Info(cc).Msgf("消费失败, err:%s, msg:%s", err.Error(), string(msg.Value))
			continue
		}
		session.MarkMessage(msg, "")
	}
	return nil
}

// NewConsumerGroup 创建新消费组
func NewConsumerGroup(addrs []string, groupID string, config *Config, saslConfig *KaConfig) (ConsumerGroup, error) {
	group, err := sarama.NewConsumerGroup(addrs, groupID, saramaConfig(config, saslConfig))
	if err != nil {
		return nil, err
	}
	return &consumerGroup{
		consumerGroup: group,
	}, nil
}
